# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
from kafka import KafkaConsumer
import random
import pandas as pd


def main():
    consumer = KafkaConsumer(
        'orders',
        bootstrap_servers='123.56.187.176:9092',
        group_id='new_test_group' + str(random.randint(1, 100000)),
        auto_offset_reset='earliest',
        value_deserializer=lambda x: x.decode('utf-8')
    )

    df = pd.DataFrame(
        columns=['user_id', 'order_category', 'order_name', 'order_quantity', 'order_date', 'order_score',
                 'status'])
    count = 1
    # 消费消息
    for message in consumer:
        data = message.value.split("\t")
        if len(data) == 7:    
            df = df.append(pd.Series(data, index=df.columns), ignore_index=True)
            print(f"Appended to DataFrame: {data}")
            count +=1
            if count == 10000:
               break
        
    df.to_csv('/training/kafka_messages.csv', index=False)
    print("Data has been written to kafka_messages.csv")


if __name__ == "__main__":
    main()

